feat: add native Parquet file reading support via extension#71
feat: add native Parquet file reading support via extension#71BingqingLyu wants to merge 16 commits intoalibaba:mainfrom
Conversation
| ParquetParseOptions parquetOpts; | ||
| ReadOptions readOpts; | ||
|
|
||
| // Configure Parquet-specific reader properties | ||
| auto reader_properties = std::make_shared<parquet::ReaderProperties>(); | ||
|
|
||
| // Enable buffered stream if configured | ||
| if (parquetOpts.buffered_stream.get(options)) { | ||
| reader_properties->enable_buffered_stream(); | ||
| } | ||
|
|
||
| // Set I/O buffer size in bytes | ||
| int64_t buffer_size = readOpts.batch_size.get(options); | ||
| reader_properties->set_buffer_size(buffer_size); | ||
|
|
||
| fragment_scan_options->reader_properties = reader_properties; | ||
|
|
||
| // Configure Arrow-specific reader properties | ||
| auto arrow_reader_properties = std::make_shared<parquet::ArrowReaderProperties>(); | ||
|
|
||
| // Set Arrow row batch size (number of rows per batch) | ||
| int64_t row_batch_size = parquetOpts.row_batch_size.get(options); | ||
| arrow_reader_properties->set_batch_size(row_batch_size); | ||
|
|
||
| // Use threads setting from general read options | ||
| arrow_reader_properties->set_use_threads(readOpts.use_threads.get(options)); | ||
|
|
||
| // Configure pre-buffering for high-latency filesystems | ||
| arrow_reader_properties->set_pre_buffer(parquetOpts.pre_buffer.get(options)); | ||
|
|
||
| // Configure caching of decompressed data | ||
| if (parquetOpts.cache_decompressed.get(options)) { | ||
| arrow_reader_properties->set_cache_options( | ||
| arrow::io::CacheOptions::LazyDefaults()); | ||
| } else { | ||
| arrow_reader_properties->set_cache_options( | ||
| arrow::io::CacheOptions::Defaults()); | ||
| } | ||
|
|
||
| fragment_scan_options->arrow_reader_properties = arrow_reader_properties; | ||
|
|
||
| return fragment_scan_options; |
There was a problem hiding this comment.
use_embedded_schema option declared but never read
ParquetParseOptions defines use_embedded_schema (default: true) and it is documented in both parquet_options.h and load_parquet.md as a supported option, but buildFragmentOptions() never calls parquetOpts.use_embedded_schema.get(options). Every other option (buffered_stream, pre_buffer, cache_decompressed, row_batch_size) is consumed, but this one is completely ignored, so passing use_embedded_schema=false to LOAD FROM has no observable effect.
The option should either be wired into the Arrow ParquetFragmentScanOptions / ArrowReaderProperties (e.g., to decide whether to trust the embedded schema vs. infer from data), or removed from the struct, the header doc, and the user-facing documentation.
There was a problem hiding this comment.
Fixed in ff02825. Removed the use_embedded_schema option entirely — it was declared but never read by buildFragmentOptions(). Arrow's ParquetFileFormat uses the embedded Parquet schema by default and provides no straightforward API to override this at the ParquetFragmentScanOptions level. The option has been removed from ParquetParseOptions, the header doc comment, and load_parquet.md.
| // Configure caching of decompressed data | ||
| if (parquetOpts.cache_decompressed.get(options)) { | ||
| arrow_reader_properties->set_cache_options( | ||
| arrow::io::CacheOptions::LazyDefaults()); | ||
| } else { | ||
| arrow_reader_properties->set_cache_options( | ||
| arrow::io::CacheOptions::Defaults()); | ||
| } |
There was a problem hiding this comment.
cache_decompressed semantic mismatch with Arrow's CacheOptions
The option is documented as "Cache decompressed column chunks to accelerate repeated reads", but ArrowReaderProperties::set_cache_options() configures Arrow's I/O hole-filling cache (read coalescing), not decompressed column chunk caching.
Additionally, the mapping may be logically inverted: cache_decompressed=true applies CacheOptions::LazyDefaults() (lazy=true), which means the cache only loads bytes that are explicitly requested (i.e., does not pre-fetch). CacheOptions::Defaults() (lazy=false) fetches data more eagerly. If the intent is to "cache aggressively for repeated reads", the non-lazy (Defaults) option would be more appropriate when cache_decompressed=true.
Consider:
- Renaming the option to something that more accurately reflects what it controls (e.g.,
enable_io_coalescingorlazy_io_cache), and updating the documentation accordingly. - Verifying whether
lazy=trueorlazy=falseactually delivers the desired read-performance behaviour for the intended use-case.
There was a problem hiding this comment.
Fixed in ff02825. Renamed cache_decompressed to enable_io_coalescing to accurately reflect what arrow::io::CacheOptions controls — this is Arrow's I/O read coalescing (hole-filling cache) for non-contiguous byte ranges, not decompressed column chunk caching. The option key is now ENABLE_IO_COALESCING, with updated comments in parquet_options.h, parquet_options.cc, and load_parquet.md. The lazy/eager semantics (LazyDefaults vs Defaults) are preserved as-is, since lazy coalescing (only load explicitly requested ranges) is the natural default when coalescing is enabled.
| static execution::Context execFunc( | ||
| std::shared_ptr<reader::ReadSharedState> state) { | ||
| // Get file system from provider | ||
| LocalFileSystemProvider fsProvider; | ||
| auto fileInfo = fsProvider.provide(state->schema.file); | ||
| state->schema.file.paths = fileInfo.resolvedPaths; | ||
|
|
||
| // Create Parquet-specific options builder | ||
| auto optionsBuilder = | ||
| std::make_unique<reader::ArrowParquetOptionsBuilder>(state); | ||
|
|
||
| // Create Arrow reader with Parquet options | ||
| auto reader = std::make_unique<reader::ArrowReader>( | ||
| state, std::move(optionsBuilder), fileInfo.fileSystem); | ||
|
|
||
| // Execute read operation | ||
| execution::Context ctx; | ||
| auto localState = std::make_shared<reader::ReadLocalState>(); | ||
| reader->read(localState, ctx); | ||
| return ctx; | ||
| } |
There was a problem hiding this comment.
execFunc silently ignores errors from reader->read()
The read() call on line 66 populates ctx but does not return a status. If ArrowReader::read() internally encounters an Arrow error (e.g., corrupted file, schema mismatch), errors may only surface as logged warnings rather than propagated exceptions, leaving ctx in a partially populated state that the caller cannot distinguish from a successful read.
Comparing with the sniffFunc below (lines 97–101), which explicitly checks sniffResult and throws on failure, execFunc has no such guard. It is worth confirming that ArrowReader::read() always throws on failure (rather than logging and continuing) so that errors are not silently swallowed.
There was a problem hiding this comment.
Confirmed in ff02825. Added an explicit comment in execFunc to document this. ArrowReader::read() propagates all errors via exceptions (THROW_IO_EXCEPTION / THROW_INVALID_ARGUMENT_EXCEPTION) — see src/utils/reader/reader.cc where every Arrow Result and Status failure path checks .ok() and throws. There is no silent swallowing path.
| # Point Arrow to use the project's RapidJSON | ||
| set(RapidJSON_ROOT "${CMAKE_SOURCE_DIR}/third_party/rapidjson" CACHE PATH "" FORCE) | ||
| endif() | ||
| set(ARROW_WITH_ZLIB OFF CACHE BOOL "" FORCE) | ||
| set(ARROW_ENABLE_THREADING ON CACHE BOOL "" FORCE) | ||
|
|
There was a problem hiding this comment.
cmake_policy(SET CMP0169 OLD) suppresses a CMake 4.x deprecation warning but defers the real fix
Setting CMP0169 OLD silences the deprecation of the legacy FetchContent_Populate() pattern, but this is marked as an error in CMake 4.x by default, meaning the workaround must stay in sync with future CMake releases. The FetchContent_MakeAvailable() API (available since CMake 3.14) is the recommended migration path.
This is acceptable as a short-term compatibility fix, but a follow-up to migrate away from FetchContent_Populate() should be tracked, especially since the project already handles CMake 4.x incompatibilities in other parts of this patch.
There was a problem hiding this comment.
Acknowledged. The cmake_policy(SET CMP0169 OLD) is an intentional short-term compatibility shim to unblock CMake 4.x builds. Migrating the affected FetchContent_Populate() calls to FetchContent_MakeAvailable() is tracked as a follow-up — the current patch keeps the existing FetchContent call sites working without regressions while the proper migration is planned separately.
- Remove use_embedded_schema option (was declared but never read by buildFragmentOptions(); Arrow always uses the embedded Parquet schema by default and provides no simple API to override this) - Rename cache_decompressed -> enable_io_coalescing to accurately reflect what arrow::io::CacheOptions controls (I/O hole-filling / read coalescing, not decompressed column chunk caching); update comments and docs accordingly - Add comment in execFunc clarifying that ArrowReader::read() propagates all errors via exceptions, not silent logging - Remove TestOptionsTranslation_BufferedStream test which could not meaningfully verify the disabled case (Arrow exposes no getter for is_buffered_stream_enabled); rename TestOptionsTranslation_CacheOptions -> TestOptionsTranslation_IoCoalescing and update option keys to match the renamed ENABLE_IO_COALESCING option
|
Addressed all review comments in ff02825:
|
| # ============================================================ | ||
| extension_tests_default: | ||
| runs-on: [self-hosted] | ||
| container: |
There was a problem hiding this comment.
- Should trigger neug-extension-test.yml at least for this PR.
- We should first run neug-extension-test.yml with image: 0.1.0-nightly. If it works find, retag it to 0.1.0 and push. cc @lnfjpt
|
|
||
| # Configure Arrow Parquet support if parquet extension is enabled | ||
| if(BUILD_EXTENSIONS AND "parquet" IN_LIST BUILD_EXTENSIONS) | ||
| set(ARROW_PARQUET ON CACHE BOOL "" FORCE) |
There was a problem hiding this comment.
ARROW_PARQUET itself is a guard in Arrow CMakeList, maybe we should use a different name here, i.e. ARROW_ENABLE_PARQUET
What do these changes do?
This PR adds native support for reading Apache Parquet format files in NeuG through a new Parquet extension. Users can now directly load graph data from Parquet files without requiring format conversion, integrating seamlessly with NeuG's existing data import mechanisms.
Related issue number
Fixes #56
Greptile Summary
This PR adds a native Apache Parquet file reading extension to NeuG, enabling users to directly
LOAD FROMParquet files without format conversion. The implementation follows NeuG's existing JSON extension pattern, introducingArrowParquetOptionsBuilder(extendingArrowOptionsBuilder), aParquetReadFunctionwithexecFunc/sniffFunc, and anInit()/Name()entry point — all wired into the CMake build system with conditional Arrow Parquet/Snappy/Zlib support and CMake 4.x compatibility patches.Key issues found:
use_embedded_schemaoption is a no-op: The option is declared inParquetParseOptions, prominently documented in bothparquet_options.handload_parquet.md, butbuildFragmentOptions()never callsparquetOpts.use_embedded_schema.get(options). Passinguse_embedded_schema=falsetoLOAD FROMhas absolutely no effect, which will mislead users.TestOptionsTranslation_BufferedStreamprovides false test coverage: The test for theBUFFERED_STREAM=falsepath asserts only thatreader_properties != nullptr, which is always true. The test body even acknowledges Arrow has no getter for the buffered-stream flag, yet the test is not removed or marked as a limitation, giving false confidence this path is verified.cache_decompressedsemantics are misleading: The option is documented as "Cache decompressed column chunks to accelerate repeated reads" but it actually configures Arrow's I/O hole-filling (read coalescing) cache viaset_cache_options(). The mapping may also be logically inverted:cache_decompressed=true→LazyDefaults()(lazy=true, on-demand fetching) rather than the more eagerDefaults()(lazy=false).CMP0169 OLDdefers migration: SettingCMP0169 OLDsuppresses the CMake 4.x error forFetchContent_Populate()but does not migrate to the recommendedFetchContent_MakeAvailable()pattern; this should be tracked as a follow-up.Confidence Score: 3/5
use_embedded_schema) is completely non-functional, which blocks merging without either fixing the implementation or removing the option from the docs and API.use_embedded_schemaoption being silently non-functional is a clear correctness gap that will confuse users. Combined with the misleadingcache_decompressedsemantics and a test that provides false coverage forBUFFERED_STREAM=false, the implementation has enough quality issues to warrant revisions before merging.extension/parquet/src/parquet_options.ccrequires the most attention —use_embedded_schemamust be either implemented or removed, and thecache_decompressed/CacheOptionsmapping should be clarified.extension/parquet/tests/parquet_test.cppalso needs theBufferedStreamtest corrected.Important Files Changed
use_embedded_schemaoption is declared and documented but never read/applied inbuildFragmentOptions(). Also contains a potential semantic inversion incache_decompressedlogic mapping to Arrow'sCacheOptions.ParquetParseOptionsandArrowParquetOptionsBuilder. Well-structured and follows the existing CSV builder pattern, butuse_embedded_schemais declared here yet never used in the implementation.ParquetReadFunctionwithexecFuncandsniffFunc. Pattern mirrors the JSON extension correctly.sniffFunchas proper error handling;execFuncerror handling depends onArrowReader::read()always throwing on failure.ParquetReadFunctionand extension metadata. Correctly mirrors the JSON extension pattern with proper exception handling.TestOptionsTranslation_BufferedStreamtest does not actually verify the disabled state and provides false assurance, as Arrow doesn't expose a getter for buffered-stream status.FetchContent_Populate()and Arrow'sThirdpartyToolchain.cmake. TheCMP0169 OLDpolicy suppresses an error but defers the real migration toFetchContent_MakeAvailable().NEUG_RUN_PARQUET_TESTSenvironment variable following the JSON test pattern.use_embedded_schemaas a supported option, but this option currently has no effect in the implementation, which would mislead users.Sequence Diagram
sequenceDiagram participant User as User (Cypher) participant Ext as ParquetExtension (Init/Name) participant RF as ParquetReadFunction participant FSP as LocalFileSystemProvider participant OB as ArrowParquetOptionsBuilder participant AR as ArrowReader participant Arrow as Apache Arrow Dataset API User->>Ext: LOAD PARQUET / INSTALL PARQUET Ext->>RF: registerFunction(ParquetReadFunction) User->>RF: LOAD FROM "file.parquet" RETURN * RF->>FSP: provide(fileSchema) FSP-->>RF: resolvedPaths + fileSystem alt Schema inference (sniffFunc) RF->>OB: ArrowParquetOptionsBuilder(state) RF->>AR: ArrowReader(state, optionsBuilder, fs) AR->>Arrow: inferSchema() Arrow-->>AR: Arrow Schema AR-->>RF: EntrySchema end RF->>OB: build() OB->>OB: buildFragmentOptions()<br/>(buffered_stream, pre_buffer,<br/>cache_decompressed, row_batch_size) OB->>OB: buildFileFormat(scanOptions)<br/>(ParquetFileFormat + default_fragment_scan_options) OB-->>RF: ArrowOptions(scanOptions, fileFormat) RF->>AR: read(localState, ctx) AR->>Arrow: createScanner(fs) Arrow-->>AR: Scanner AR->>Arrow: full_read / batch_read Arrow-->>AR: RecordBatches AR-->>RF: execution::Context RF-->>User: Result rowsComments Outside Diff (1)
extension/parquet/tests/parquet_test.cpp, line 1187-1210 (link)TestOptionsTranslation_BufferedStreamdoes not actually verify the disabled caseThe test sets
BUFFERED_STREAM=falseand then its only assertion is:This assertion is always true regardless of whether buffered stream is enabled or disabled (the previous test suite already verifies
reader_propertiesis non-null with default options). The comment in the test even acknowledges this: "Arrow doesn't expose a getter for is_buffered_stream_enabled".Because
parquet::ReaderPropertiesdoes not expose a getter for the buffered-stream flag, the only meaningful way to test this branch is either:As written, this test gives false confidence that the
BUFFERED_STREAM=falsepath is exercised correctly.Last reviewed commit: 1fb1d56
(5/5) You can turn off certain types of comments like style here!